Define functions


In [56]:
import csv
import xml.etree.ElementTree as ET
from os import listdir
import re
import subprocess
from tempfile import mkdtemp
from glob import glob

target = 'target'
ana_name = 'ana'
TALKER_SEP = '_TALKER_'

def get_talker(all_talkers,indices, b):
    for i in range(len(indices)):
        if indices[i] > b:
            return all_talkers[i-1].strip()

def get_annotations_text(root, text, as_list=False):
    texts = {}
    talkers = {}
    all_talkers = re.findall('([א-ת])+ [א-ת]+:\n', text)

    all_talkers = re.findall('\n.*:\n', text)
    valid_talkers = []
    for talker in all_talkers:
        if len(talker) > 2 and len(talker) <= 40:
            valid_talkers.append(talker)
    all_talkers = valid_talkers
    talkers_indices=[]
    i = 0
    for name in all_talkers:
        i = text.find(name, i)
        talkers_indices.append(i)

    for n in root.iter('{http://www.tei-c.org/ns/1.0}seg'):
        ana = ''
        for a in n.iter():
            attr = a.attrib
            #print(attr)
            if ana_name in attr:
                ana = attr[ana_name][1:]
              #  print("ana="+ana)
            if target in attr:
                b,e = (attr[target].split('=')[1]).split(",")
                #print(b+"," + e)
                a = attr[target].split('#')[0]
                t = text[int(b):int(e) + 1]
                #print("b,e %s,%s " %(b,e))
                if len(t) >= 1:
                    talker = get_talker(all_talkers, talkers_indices, int(b))
                    if ana in texts:
                        #print("a="+a)
                        #print("t="+t)
                        texts[ana].append(t)

                       #print("appending " + t + " to " + ana)
                    else:
                        #print("inserting " + t + " to " + ana + "...")
                        texts[ana]=[t]
                        talkers[ana]  = talker

    cats = {}
    for aa in root.iter('{http://www.tei-c.org/ns/1.0}fsDecl'):
        for a in aa.iter('{http://www.tei-c.org/ns/1.0}fsDecl'):
           # print (a.tag)
            att = a.attrib
            for x in a.iter():
                if x.tag == '{http://www.tei-c.org/ns/1.0}fsDescr' and 'type' in att:
                    name = x.text
                    t = att['type']
                    if not t in cats:
                        cats[t] = name


    annotaions_text = {}
    for c in cats:
        if as_list:
            annotaions_text[cats[c]] = []
        else:
            annotaions_text[cats[c]] = ''

    for n in root.iter('{http://www.tei-c.org/ns/1.0}fs'):
         id =  n.attrib['{http://www.w3.org/XML/1998/namespace}id']
         if 'type' in n.attrib:
           t=n.attrib['type']
           #print('type='+t)

           if id in texts:
               anno_cat = cats[t]
               anno_text = ''.join(texts[id])
               anno_talker = "?"
               if talkers[id] is not None:
                   anno_talker = talkers[id].strip()
               if as_list:
                    annotaions_text[anno_cat].append((anno_talker,anno_text))
               else:
                    annotaions_text[anno_cat] = annotaions_text[anno_cat] + "\t"   + anno_talker + TALKER_SEP + anno_text
    return annotaions_text

def get_annotations(annotation_path, as_list=False, with_text=False):
    text_filenames = glob('{}/*.txt'.format(annotation_path))
    assert len(text_filenames) == 1
    text_filename = text_filenames[0]
    xml_filenames = glob('{}/*/*.xml'.format(annotation_path))
    assert len(xml_filenames) == 1
    xml_filename = xml_filenames[0]
    with open(text_filename,'r',encoding='utf-8') as file:
        text = file.read()
        text = text.replace("\n",'\n ')
    tree = ET.parse(xml_filename)
    root = tree.getroot()
    annotations = get_annotations_text(root, text, as_list=as_list)
    if with_text:
        return annotations, text
    else:
        return annotations

Catma is used by Bar Ilan University (BIU) to do manual classification / tagging of protocol parts

Original protocol files are uploaded Catma which parses them into text, which BIU manually tags according to certain predefined tags (related to law)

Need to export the corpus from Catma and provide the .tar.gz file as input for this notebook


In [48]:
corpus_filename = '/pipelines/data/catma/ההסדרים_אקראיים1909171124.tar.gz'

In [49]:
corpus_dir = mkdtemp()
subprocess.check_call('tar -xzvf "{}" -C "{}"'.format(corpus_filename, corpus_dir), shell=True)
annotation_paths = glob('{}/*/*'.format(corpus_dir))
for i,p in enumerate(annotation_paths): print(i,p)


0 /tmp/tmpi5w8l9e9/ההסדרים_אקראיים/פרוטוקול_1_2009-06-17._parts.txt
1 /tmp/tmpi5w8l9e9/ההסדרים_אקראיים/פרוטוקול_מס_10.txt
2 /tmp/tmpi5w8l9e9/ההסדרים_אקראיים/פרוטוקול_52_26.12.07.txt
3 /tmp/tmpi5w8l9e9/ההסדרים_אקראיים/פרוטוקול_מס'_35.txt
4 /tmp/tmpi5w8l9e9/ההסדרים_אקראיים/parts_פרוטוקול_37_2009-07-08-01.txt
5 /tmp/tmpi5w8l9e9/ההסדרים_אקראיים/פרוטוקול_21_2009-07-05-06.txt

Choose an annotation path to get annotations from


In [50]:
annotation_path = annotation_paths[0]
annotation_path


Out[50]:
'/tmp/tmpi5w8l9e9/ההסדרים_אקראיים/פרוטוקול_1_2009-06-17._parts.txt'

In [51]:
get_annotations(annotation_path, as_list=True)


Out[51]:
{'Judicial decision': [],
 'constitutional turns': [('היו"ר זאב אלקין:',
   'ובתיאום עם אנשי האופוזיציה קבענו את לוח הזמנים שיאפשר לכולם את זכות הביטוי. '),
  ('נסים זאב:',
   'היא מוותרת על זכות ההדיבור, וזאת זכותה, אנחנו לא יכולים לכפות עליה גם לדבר. בדמוקרטיה, כמו שיש חופש דיבור, גם אי-אפשר לכפות עעל אדם לדבר בכוח. ')],
 'Doubt': [],
 'Anticipating Judicial Review': []}

Get annotation statistics


In [68]:
from dataflows import Flow, printer

known_categories = [
    'Judicial decision',
    'constitutional turns',
    'Doubt',
    'Anticipating Judicial Review'
]

def get_year(text):
    return re.findall('[2][0][0-9][0-9]', text)[0]

yearly_counts = {}

def get_annotation_file_stats(annotation_paths):
    for annotation_path in annotation_paths:
        annotations, text = get_annotations(annotation_path, as_list=True, with_text=True)
        year = get_year(text)
        if not yearly_counts.get(year):
            yearly_counts[year] = {c: 0 for c in known_categories}
        row = {
            'year': year,
            'dirname': annotation_path.replace(corpus_dir, '').strip('/'),
            **{
                c: 0 for c in known_categories
            }
        }
        for category, category_annotations in annotations.items():
            assert category in known_categories
            row[category] = len(category_annotations)
            yearly_counts[year][category] += len(category_annotations)
        yield row

def get_yearly_counts():
    for year, counts in yearly_counts.items():
        yield {
            'year': year,
            **counts
        }
        
Flow(
    get_annotation_file_stats(annotation_paths),
    get_yearly_counts(),
    printer(tablefmt='html')
).process()


res_1

# year (string)dirname (string) Judicial decision (integer) constitutional turns (integer) Doubt (integer) Anticipating Judicial Review (integer)
12009ההסדרים_אקראיים/פרוטוקול_1_2009-06-17._parts.txt 0200
22015ההסדרים_אקראיים/פרוטוקול_מס_10.txt 0000
32007ההסדרים_אקראיים/פרוטוקול_52_26.12.07.txt 0001
42013ההסדרים_אקראיים/פרוטוקול_מס'_35.txt 0000
52009ההסדרים_אקראיים/parts_פרוטוקול_37_2009-07-08-01.txt0000
62009ההסדרים_אקראיים/פרוטוקול_21_2009-07-05-06.txt 0000

res_2

# year (string) Judicial decision (integer) constitutional turns (integer) Doubt (integer) Anticipating Judicial Review (integer)
120090200
220150000
320070001
420130000
Out[68]:
(<datapackage.package.Package at 0x7f0636d9cef0>, {})